(ns metabase.driver.mongo.query-processor
  "Logic for translating MBQL queries into Mongo Aggregation Pipeline queries. See
  https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/ for more details."
  (:refer-clojure :exclude [some mapv select-keys empty?])
  (:require
   [clojure.set :as set]
   [clojure.string :as str]
   [flatland.ordered.map :as ordered-map]
   [java-time.api :as t]
   [medley.core :as m]
   [metabase.driver :as driver]
   [metabase.driver-api.core :as driver-api]
   [metabase.driver.common :as driver.common]
   [metabase.driver.mongo.operators :refer [$add $addFields $addToSet $and
                                            $avg $concat $cond $dayOfMonth
                                            $dayOfWeek $dayOfYear $divide $eq
                                            $expr $group $gt $gte $hour $limit
                                            $literal $lookup $lt $lte $match
                                            $max $min $minute $mod $month
                                            $multiply $ne $not $or $project
                                            $regexMatch $second
                                            $setWindowFields $size $skip $sort
                                            $strcasecmp $subtract $sum
                                            $toBool $toLower $unwind $year]]
   [metabase.driver.util :as driver.u]
   [metabase.util :as u]
   [metabase.util.date-2 :as u.date]
   [metabase.util.i18n :refer [tru]]
   [metabase.util.log :as log]
   [metabase.util.malli :as mu]
   [metabase.util.performance :as perf :refer [some mapv select-keys empty?]])
  (:import
   (org.bson BsonBinarySubType)
   (org.bson.types Binary ObjectId)))

(set! *warn-on-reflection* true)

;;; +----------------------------------------------------------------------------------------------------------------+
;;; |                                                     Schema                                                     |
;;; +----------------------------------------------------------------------------------------------------------------+

;; this is just a very limited schema to make sure we're generating valid queries. We should expand it more in the
;; future

(def ^:private $ProjectStage         [:map-of [:= $project]   [:map-of driver-api/schema.common.non-blank-string :any]])
(def ^:private $SortStage            [:map-of [:= $sort]      [:map-of driver-api/schema.common.non-blank-string [:enum -1 1]]])
(def ^:private $MatchStage           [:map-of [:= $match]     [:map-of
                                                               [:and
                                                                [:or driver-api/schema.common.non-blank-string :keyword]
                                                                [:fn
                                                                 {:error/message "not a $not condition"}
                                                                 (complement #{:$not "$not"})]]
                                                               :any]])
(def ^:private $GroupStage           [:map-of [:= $group]     [:map-of driver-api/schema.common.non-blank-string :any]])
(def ^:private $AddFieldsStage       [:map-of [:= $addFields] [:map-of driver-api/schema.common.non-blank-string :any]])
(def ^:private $LookupStage          [:map-of [:= $lookup]    [:map-of [:or :keyword :string] :any]])
(def ^:private $UnwindStage          [:map-of [:= $unwind]    [:map-of [:or :keyword :string] :any]])
(def ^:private $LimitStage           [:map-of [:= $limit]     pos-int?])
(def ^:private $SkipStage            [:map-of [:= $skip]      pos-int?])
(def ^:private $SetWindowFieldsStage [:map-of [:= $setWindowFields] [:map-of driver-api/schema.common.non-blank-string :any]])

(def ^:private Stage
  [:and
   :map
   [:fn
    {:error/message "map with a single key"}
    #(= (count %) 1)]
   [:multi
    {:dispatch (fn [m]
                 (first (keys m)))}
    [$project         $ProjectStage]
    [$sort            $SortStage]
    [$group           $GroupStage]
    [$addFields       $AddFieldsStage]
    [$lookup          $LookupStage]
    [$unwind          $UnwindStage]
    [$match           $MatchStage]
    [$limit           $LimitStage]
    [$skip            $SkipStage]
    [$setWindowFields $SetWindowFieldsStage]]])

(def ^:private Pipeline [:sequential Stage])

(def Projections
  "Schema for the `:projections` generated by the functions in this namespace. It is a sequence of the column names
  returned in an MBQL query. e.g.

    [\"_id\" \"date\" \"user_id\" \"venue_id\"]"
  [:sequential :string])

;;; +----------------------------------------------------------------------------------------------------------------+
;;; |                                                    QP Impl                                                     |
;;; +----------------------------------------------------------------------------------------------------------------+

;; TODO - We already have a *query* dynamic var in metabase.query-processor.interface. Do we need this one too?
(def ^:dynamic ^:private *query* nil)

(def ^:dynamic ^:private *nesting-level*
  "Used for tracking depth of nesting on which [[mbql->native-rec]] operates.
  That is required eg. in `->lvalue :aggregation` call."
  0)

(def ^:dynamic ^:private *next-alias-index*
  "Tracks index of next alias for join compilation. It is bound in [[mbql->native]] to `volatile!` valued 0. Hence
   every compilation starts with a fresh 0. Indices are used in [[handle-join]] to make aliases unique. Index values
   are gathered using [[next-alias-index]], hence first used index is of value 1."
  nil)

(defn- next-alias-index
  "Increment [[*next-alias-index*]] counter and return new index. Further context can be found in
   [[*next-alias-index*]] docstring."
  []
  (vswap! *next-alias-index* inc))

(def ^:dynamic ^:private *field-mappings*
  "The mapping from the fields to the projected names created
  by the nested query."
  {})

(defn- find-mapped-field-name
  "Finds the name of a mapped field, if any.
  First it does a quick exact match and if the field is not found, it searches for a field with the same ID/name and
  the same join alias.
  Note that during the compilation of joins, the field :join-alias is renamed to ::join-local to prevent prefixing the
  fields of the current join to be prefixed with the join alias."
  [[_ field-id params :as field]]
  (or (get *field-mappings* field)
      (some (fn [[e n]]
              (when (and (vector? e)
                         (= (subvec e 0 2) [:field field-id])
                         (= (:join-alias (e 2)) (:join-alias params))
                         (= (::join-local (e 2)) (::join-local params)))
                n))
            *field-mappings*)))

(defn- get-join-alias
  "Calculates the name of the join field used for `join-alias`, if any.
  It is assumed that join aliases are unique in the query (this is ensured by the escape-join-aliases middleware),
  so the alias is simply prefixed with a string to make it less likely that join filed we introduce in the $unwind
  stage overwrites a field of the document being joined to."
  [join-alias]
  (some->> join-alias (str "join_alias_")))

(defn- get-mongo-version []
  (driver-api/cached ::version
                     (driver/dbms-version :mongo (driver-api/database (driver-api/metadata-provider)))))

(defmulti ^:private ->rvalue
  "Format this `Field` or value for use as the right hand value of an expression, e.g. by adding `$` to a `Field`'s
  name"
  {:arglists '([x])}
  driver-api/dispatch-by-clause-name-or-class)

(defmulti ^:private ->lvalue
  "Return an escaped name that can be used as the name of a given Field."
  {:arglists '([field])}
  driver-api/dispatch-by-clause-name-or-class)

(defn- col->name-components [{:keys [parent-id], field-name :name, :as _col}]
  (concat
   ;; TODO (Cam 8/11/25) -- this should be using `:nfc-path` instead of looking this up the hard way
   (when parent-id
     (col->name-components (driver-api/field (driver-api/metadata-provider) parent-id)))
   [field-name]))

(mu/defn field->name
  "Return a single string name for column metadata `col` For nested fields, this creates a combined qualified name."
  ([col]
   (field->name col \.))

  ([col       :- driver-api/schema.metadata.column
    separator :- [:or :string char?]]
   (str/join separator (col->name-components col))))

(defmacro ^:private mongo-let
  {:style/indent 1}
  [[field value] & body]
  {:$let {:vars {(keyword field) value}
          :in   `(let [~field ~(keyword (str "$$" (name field)))]
                   ~@body)}})

(declare with-rvalue-temporal-bucketing)

(defn- scope-with-join-field
  "Adjust `field-name` for fields coming from joins. For use in `->[lr]value` for `:field` and `:metadata/column`."
  [field-name join-field source-alias]
  (cond->> (or source-alias field-name)
    join-field (str join-field \.)))

(defmethod ->lvalue :metadata/column
  [{::keys [join-field source-alias] :as field}]
  (scope-with-join-field (field->name field) join-field source-alias))

(defmethod ->lvalue :expression
  [[_ expression-name opts]]
  (or (get opts driver-api/qp.add.desired-alias) expression-name))

(defmethod ->rvalue :default
  [x]
  x)

(defmethod ->rvalue :expression
  [[_ expression-name]]
  (let [expression-value (driver-api/expression-with-name (:query *query*) expression-name)]
    (cond->> (->rvalue expression-value)
      (driver-api/is-clause? :value expression-value) (array-map $literal))))

(def ^:private base64-decoder "
function(bin) {
          if (!bin) return null;

          try {
            var base64 = bin.base64();

            // Manual base64 decode implementation
            var chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';
            var result = '';
            var i = 0;

            // Remove any padding
            base64 = base64.replace(/=+$/, '');

            while (i < base64.length) {
              var a = chars.indexOf(base64.charAt(i++));
              var b = chars.indexOf(base64.charAt(i++));
              var c = chars.indexOf(base64.charAt(i++));
              var d = chars.indexOf(base64.charAt(i++));

              var bitmap = (a << 18) | (b << 12) | (c << 6) | d;

              result += String.fromCharCode((bitmap >> 16) & 255);
              if (c !== -1) result += String.fromCharCode((bitmap >> 8) & 255);
              if (d !== -1) result += String.fromCharCode(bitmap & 255);
            }

            return result;
          } catch(e) {
            return null;
          }
        }
")

(defmethod ->rvalue :metadata/column
  [{coercion :coercion-strategy, ::keys [source-alias join-field inherited?] :as field}]
  (let [field-name (str \$ (scope-with-join-field (field->name field) join-field source-alias))
        coercion   (when-not inherited?
                     coercion)]
    (cond
      (isa? coercion :Coercion/UNIXNanoSeconds->DateTime)
      {:$dateFromParts {:millisecond {$divide [field-name 1000000]}, :year 1970, :timezone "UTC"}}

      (isa? coercion :Coercion/UNIXMicroSeconds->DateTime)
      {:$dateFromParts {:millisecond {$divide [field-name 1000]}, :year 1970, :timezone "UTC"}}

      (isa? coercion :Coercion/UNIXMilliSeconds->DateTime)
      {:$dateFromParts {:millisecond field-name, :year 1970, :timezone "UTC"}}

      (isa? coercion :Coercion/UNIXSeconds->DateTime)
      {:$dateFromParts {:second field-name, :year 1970, :timezone "UTC"}}

      (isa? coercion :Coercion/YYYYMMDDHHMMSSString->Temporal)
      {"$dateFromString" {:dateString field-name
                          :format     "%Y%m%d%H%M%S"
                          :onError    field-name}}

      (isa? coercion :Coercion/YYYYMMDDHHMMSSBytes->Temporal)
      {"$dateFromString" {:dateString {"$function"
                                       {:body base64-decoder
                                        :args [field-name]
                                        :lang "js"}}
                          :format     "%Y%m%d%H%M%S"
                          :onError    field-name}}

      (isa? coercion :Coercion/ISO8601Bytes->Temporal)
      {"$dateFromString" {:dateString {"$function"
                                       {:body base64-decoder
                                        :args [field-name]
                                        :lang "js"}}
                          :onError    field-name}}

      ;; mongo only supports datetime
      (isa? coercion :Coercion/ISO8601->DateTime)
      {"$dateFromString" {:dateString field-name
                          :onError    field-name}}

      (isa? coercion :Coercion/ISO8601->Date)
      (throw (ex-info (tru "MongoDB does not support parsing strings as dates. Try parsing to a datetime instead")
                      {:type              driver-api/qp.error-type.unsupported-feature
                       :coercion-strategy coercion}))

      (isa? coercion :Coercion/ISO8601->Time)
      (throw (ex-info (tru "MongoDB does not support parsing strings as times. Try parsing to a datetime instead")
                      {:type              driver-api/qp.error-type.unsupported-feature
                       :coercion-strategy coercion}))

      (isa? coercion :Coercion/DateTime->Date)
      (with-rvalue-temporal-bucketing field-name :day)

      (isa? coercion :Coercion/String->Float)
      {"$toDouble" field-name}

      (isa? coercion :Coercion/String->Integer)
      {"$toLong" field-name}

      (isa? coercion :Coercion/Float->Integer)
      {"$toLong" {"$round" {"$toDouble" field-name}}}

      :else field-name)))

;; Don't think this needs to implement `->lvalue` because you can't assign something to an aggregation e.g.
;;
;;    aggregations[0] = 20
;;
(defmethod ->lvalue :aggregation
  [[_ index]]
  (driver-api/aggregation-name (:query *query*) (driver-api/aggregation-at-index *query* index *nesting-level*)))

(defmethod ->lvalue :field
  [[_ id-or-name {:keys [join-alias] :as opts} :as field]]
  (if (integer? id-or-name)
    (or (find-mapped-field-name field)
        (->lvalue (assoc (driver-api/field (driver-api/metadata-provider) id-or-name)
                         ::source-alias (driver-api/qp.add.source-alias opts)
                         ::join-field (get-join-alias join-alias))))
    (scope-with-join-field (name id-or-name) (get-join-alias join-alias) (driver-api/qp.add.source-alias opts))))

(defn- add-start-of-week-offset [expr offset]
  (cond
    (zero? offset) expr
    (neg? offset)  (recur expr (+ offset 7))
    :else          {$mod [{$add [expr offset]}
                          7]}))

(defn- day-of-week
  [column]
  (mongo-let [day_of_week (add-start-of-week-offset {$dayOfWeek {:date column :timezone (driver-api/results-timezone-id)}}
                                                    (driver.common/start-of-week-offset :mongo))]
    {$cond {:if   {$eq [day_of_week 0]}
            :then 7
            :else day_of_week}}))

(defn- week
  [column]
  {$subtract [column
              {$multiply [{$subtract [(day-of-week column)
                                      1]}
                          (* 24 60 60 1000)]}]})

(defn- truncate-to-resolution [column resolution]
  (mongo-let [parts {:$dateToParts {:timezone (driver-api/results-timezone-id)
                                    :date column}}]
    {:$dateFromParts (into {:timezone (driver-api/results-timezone-id)}
                           (for [part (concat (take-while (partial not= resolution)
                                                          [:year :month :day :hour :minute :second :millisecond])
                                              [resolution])]
                             [part (str (name parts) \. (name part))]))}))

(defn- days-till-start-of-first-full-week
  [column]
  (let [start-of-year                (with-rvalue-temporal-bucketing column :year)
        day-of-week-of-start-of-year (with-rvalue-temporal-bucketing start-of-year :day-of-week)]
    {:$subtract [8 day-of-week-of-start-of-year]}))

(defn- week-of-year
  "Full explanation of this magic is in [[metabase.driver.sql.query-processor/week-of-year]]."
  [column mode]
  (let [doy    (with-rvalue-temporal-bucketing column :day-of-year)
        dtsofw (binding [driver.common/*start-of-week* (case mode
                                                         :us :sunday
                                                         :instance nil)]
                 (days-till-start-of-first-full-week column))]
    {:$toInt {:$add [1 {:$ceil {:$divide [{:$subtract [doy dtsofw]} 7]}}]}}))

(defn- extract
  [op column]
  {op {:date column :timezone (driver-api/results-timezone-id)}})

(defn- with-rvalue-temporal-bucketing
  [field unit]
  (if (= unit :default)
    field
    (let [supports-dateTrunc? (-> (get-mongo-version)
                                  :semantic-version
                                  (driver.u/semantic-version-gte [5]))
          column field]
      (letfn [(truncate [unit]
                (if supports-dateTrunc?
                  {:$dateTrunc {:date column
                                :unit (name unit)
                                :timezone (driver-api/results-timezone-id)
                                :startOfWeek (name (driver-api/start-of-week))}}
                  (truncate-to-resolution column unit)))]
        (case unit
          :default          column
          :second-of-minute (extract $second column)
          :minute           (truncate :minute)
          :minute-of-hour   (extract $minute column)
          :hour             (truncate :hour)
          :hour-of-day      (extract $hour column)
          :day              (truncate :day)
          :day-of-week      (day-of-week column)
          :day-of-week-iso  (binding [driver.common/*start-of-week* :monday]
                              (day-of-week column))
          :day-of-month     (extract $dayOfMonth column)
          :day-of-year      (extract $dayOfYear column)
          :week             (if supports-dateTrunc?
                              (truncate :week)
                              (truncate-to-resolution (week column) :day))
          :week-of-year     (let [week-start (if supports-dateTrunc?
                                               (truncate :week)
                                               (week column))]
                              {:$ceil {$divide [{$dayOfYear week-start}
                                                7.0]}})
          :week-of-year-iso (extract :$isoWeek column)
          :week-of-year-us  (week-of-year column :us)
          :week-of-year-instance  (week-of-year column :instance)
          :month            (truncate :month)
          :month-of-year    (extract $month column)
          ;; For quarter we'll just subtract enough days from the current date to put it in the correct month and
          ;; stringify it as yyyy-MM Subtracting (($dayOfYear(column) % 91) - 3) days will put you in correct month.
          ;; Trust me.
          :quarter
          (if supports-dateTrunc?
            (truncate :quarter)
            (mongo-let [#_{:clj-kondo/ignore [:unused-binding]} parts {:$dateToParts {:date column :timezone (driver-api/results-timezone-id)}}]
              {:$dateFromParts {:year  :$$parts.year
                                :month {$subtract [:$$parts.month
                                                   {$mod [{$add [:$$parts.month 2]}
                                                          3]}]}
                                :timezone (driver-api/results-timezone-id)}}))

          :quarter-of-year
          {:$toInt {:$ceil {$divide [(extract $month column) 3.0]}}}

          :year
          (truncate :year)

          :year-of-era
          (extract $year column))))))

(defmethod ->rvalue :field
  [[_ id-or-name {:keys [temporal-unit join-alias] :as opts} :as field]]
  (let [join-field   (get-join-alias join-alias)
        source-alias (driver-api/qp.add.source-alias opts)]
    (cond-> (if (integer? id-or-name)
              (if-let [mapped (find-mapped-field-name field)]
                (str \$ mapped)
                (->rvalue (assoc (driver-api/field (driver-api/metadata-provider) id-or-name)
                                 ::source-alias source-alias
                                 ::join-field   join-field
                                 ::inherited?   (not (pos-int? (driver-api/qp.add.source-table opts))))))
              (if-let [mapped (find-mapped-field-name field)]
                (str \$ mapped)
                (str \$ (scope-with-join-field (name id-or-name) join-field source-alias))))
      temporal-unit (with-rvalue-temporal-bucketing temporal-unit))))

;; Values clauses below; they only need to implement `->rvalue`

(defmethod ->rvalue nil [_] nil)

(defn- uuid->bsonbinary
  [u]
  (let [lo (.getLeastSignificantBits ^java.util.UUID u)
        hi (.getMostSignificantBits  ^java.util.UUID u)
        ba (-> (java.nio.ByteBuffer/allocate 16) ; UUID is 128 bits-long
               (.putLong hi)
               (.putLong lo)
               (.array))]
    (Binary. BsonBinarySubType/UUID_STANDARD ba)))

(defmethod ->rvalue :value
  [[_ value {base-type :base_type}]]
  (cond
    ;; Passing nil or "" to the ObjectId or Binary constructor throws an exception
    (or (nil? value) (= value ""))
    value

    (isa? base-type :type/MongoBSONID)
    (ObjectId. (str value))

    (isa? base-type :type/MongoBinData)
    (try
      (-> (str value)
          java.util.UUID/fromString
          uuid->bsonbinary)
      (catch IllegalArgumentException _
        ;; Allow comparison with non-UUID values for things like string search
        value))

    :else value))

(defn- $date-from-string [s]
  {:$dateFromString {:dateString (str s)}})

(defmethod ->rvalue :absolute-datetime
  [[_ t unit]]
  (let [report-zone (t/zone-id (or (driver-api/report-timezone-id-if-supported :mongo (driver-api/database (driver-api/metadata-provider)))
                                   "UTC"))
        t           (condp = (class t)
                      java.time.LocalDate      t
                      java.time.LocalTime      t
                      java.time.LocalDateTime  t
                      java.time.OffsetTime     (t/offset-time t report-zone)
                      java.time.OffsetDateTime (t/offset-date-time t report-zone)
                      java.time.ZonedDateTime  (t/offset-date-time t report-zone))]
    (letfn [(extract [unit]
              (u.date/extract t unit))
            (bucket [unit]
              ($date-from-string (u.date/bucket t unit)))]
      (case (or unit :default)
        :default         ($date-from-string t)
        :minute          (bucket :minute)
        :minute-of-hour  (extract :minute-of-hour)
        :hour            (bucket :hour)
        :hour-of-day     (extract :hour-of-day)
        :day             (bucket :day)
        :day-of-week     (extract :day-of-week)
        :day-of-month    (extract :day-of-month)
        :day-of-year     (extract :day-of-year)
        :week            (bucket :week)
        :week-of-year    (extract :week-of-year)
        :month           (bucket :month)
        :month-of-year   (extract :month-of-year)
        :quarter         (bucket :quarter)
        :quarter-of-year (extract :quarter-of-year)
        :year            (bucket :year)))))

(defmethod ->rvalue :relative-datetime
  [[_ amount unit]]
  (let [t (-> (t/zoned-date-time)
              (t/with-zone-same-instant (t/zone-id (or (driver-api/report-timezone-id-if-supported :mongo (driver-api/database (driver-api/metadata-provider)))
                                                       "UTC"))))]
    ($date-from-string
     (t/offset-date-time
      (if (= unit :default)
        t
        (-> t
            (u.date/add unit amount)
            (u.date/bucket unit)))))))

;;; ---------------------------------------------------- functions ---------------------------------------------------

;; It doesn't make 100% sense to have lvalues for all these but it's a formal requirement

(defmethod ->lvalue :avg       [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :stddev    [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :var       [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :sum       [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :min       [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :max       [[_ inp]] (->lvalue inp))

(defmethod ->lvalue :floor     [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :ceil      [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :round     [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :abs       [[_ inp]] (->lvalue inp))

(defmethod ->lvalue :log       [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :exp       [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :sqrt      [[_ inp]] (->lvalue inp))

(defmethod ->lvalue :trim      [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :ltrim     [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :rtrim     [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :upper     [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :lower     [[_ inp]] (->lvalue inp))
(defmethod ->lvalue :length    [[_ inp]] (->lvalue inp))

(defmethod ->lvalue :power     [[_ & args]] (->lvalue (first args)))
(defmethod ->lvalue :replace   [[_ & args]] (->lvalue (first args)))
(defmethod ->lvalue :concat    [[_ & args]] (->lvalue (first args)))
(defmethod ->lvalue :substring [[_ & args]] (->lvalue (first args)))

(defmethod ->lvalue :+ [[_ & args]] (->lvalue (first args)))
(defmethod ->lvalue :- [[_ & args]] (->lvalue (first args)))
(defmethod ->lvalue :* [[_ & args]] (->lvalue (first args)))
(defmethod ->lvalue :/ [[_ & args]] (->lvalue (first args)))

(defmethod ->lvalue :coalesce [[_ & args]] (->lvalue (first args)))

(defmethod ->rvalue :avg       [[_ inp]] {$avg (->rvalue inp)})
(defmethod ->rvalue :stddev    [[_ inp]] {"$stdDevSamp" (->rvalue inp)})
(defmethod ->rvalue :sum       [[_ inp]] {"$sum" (->rvalue inp)})
(defmethod ->rvalue :min       [[_ inp]] {$min (->rvalue inp)})
(defmethod ->rvalue :max       [[_ inp]] {$max (->rvalue inp)})

(defmethod ->rvalue :floor     [[_ inp]] {"$floor" (->rvalue inp)})
(defmethod ->rvalue :ceil      [[_ inp]] {"$ceil" (->rvalue inp)})
(defmethod ->rvalue :round     [[_ inp]] {"$round" (->rvalue inp)})
(defmethod ->rvalue :abs       [[_ inp]] {"$abs" (->rvalue inp)})

(defmethod ->rvalue :log       [[_ inp]] {"$log10" (->rvalue inp)})
(defmethod ->rvalue :exp       [[_ inp]] {"$exp" (->rvalue inp)})
(defmethod ->rvalue :sqrt      [[_ inp]] {"$sqrt" (->rvalue inp)})

(defmethod ->rvalue :trim      [[_ inp]] {"$trim" {"input" (->rvalue inp)}})
(defmethod ->rvalue :ltrim     [[_ inp]] {"$ltrim" {"input" (->rvalue inp)}})
(defmethod ->rvalue :rtrim     [[_ inp]] {"$rtrim" {"input" (->rvalue inp)}})
(defmethod ->rvalue :upper     [[_ inp]] {"$toUpper" (->rvalue inp)})
(defmethod ->rvalue :lower     [[_ inp]] {"$toLower" (->rvalue inp)})
(defmethod ->rvalue :length    [[_ inp]] {"$strLenCP" (->rvalue inp)})

(defmethod ->rvalue :power     [[_ & args]] {"$pow" (mapv ->rvalue args)})
(defmethod ->rvalue :concat    [[_ & args]] {"$concat" (mapv ->rvalue args)})
(defmethod ->rvalue :temporal-extract [[_ inp unit]]
  (with-rvalue-temporal-bucketing (->rvalue inp) unit))

(defmethod ->rvalue :replace
  [[_ & args]]
  (let [version (get-mongo-version)]
    (if (driver.u/semantic-version-gte (:semantic-version version) [4 4])
      (let [[expr fnd replacement] (mapv ->rvalue args)]
        {"$replaceAll" {"input" expr "find" fnd "replacement" replacement}})
      (throw (ex-info "Replace requires MongoDB 4.4 or above"
                      {:database-version version})))))

(defmethod ->rvalue :substring
  [[_ & [expr idx cnt]]]
  (let [expr-val (->rvalue expr)
        idx-val {"$subtract" [(->rvalue idx) 1]}]
    {"$substrCP" [expr-val
                  idx-val
                  ;; The last argument is not optional in mongo
                  (if (some? cnt)
                    (->rvalue cnt)
                    {"$subtract" [{"$strLenCP" expr-val} idx-val]})]}))

(defmethod ->rvalue :/
  [[_ & [_ & divisors :as args]]]
  ;; division works outside in (/ 1 2 3) => (/ (/ 1 2) 3)
  (let [division (reduce
                  (fn [accum head]
                    (if accum
                      {"$divide" [accum head]}
                      head))
                  nil
                  (map ->rvalue args))
        literal-zero? (some #(and (number? %) (zero? %)) divisors)
        non-literal-nil-checks (mapv (fn [divisor] {"$eq" [(->rvalue divisor) 0]}) (remove number? divisors))]
    (cond
      literal-zero?
      nil

      (empty? non-literal-nil-checks)
      division

      (= 1 (count non-literal-nil-checks))
      {"$cond" [(first non-literal-nil-checks) nil
                division]}

      :else
      {"$cond" [{"$or" non-literal-nil-checks} nil
                division]})))

;;; Intervals are not first class Mongo citizens, so they cannot be translated on their own.
;;; The only thing we can do with them is adding to or subtracting from a date valued expression.
;;; Also, date arithmetic with intervals was first implemented in version 5. (Before that only
;;; ordinary addition could be used: one of the operands of the addition could be a date, their
;;; rest of the operands had to be integers and would be treated as milliseconds.)
;;; Because of this, whenever we translate date arithmetic with intervals, we check the major
;;; version of the database and throw a nice exception if it's less than 5.

(defn- check-date-operations-supported []
  (let [{mongo-version :version, [major-version] :semantic-version} (get-mongo-version)]
    (when (and major-version (< major-version 5))
      (throw (ex-info "Date arithmetic not supported in versions before 5"
                      {:database-version mongo-version})))))

(defn- interval? [expr]
  (and (vector? expr) (= (first expr) :interval)))

(defn- summarize-interval [op date-expr [_ amount unit]]
  {op {:startDate date-expr
       :unit unit
       :amount amount}})

(defn- summarize-num-or-interval [number-op date-op mongo-expr mbql-expr]
  (cond
    (interval? mbql-expr) (summarize-interval date-op mongo-expr mbql-expr)
    (contains? mongo-expr number-op) (update mongo-expr number-op conj (->rvalue mbql-expr))
    :else {number-op [mongo-expr (->rvalue mbql-expr)]}))

(def ^:private num-or-interval-reducer
  {:+ (partial summarize-num-or-interval "$add" "$dateAdd")
   :- (partial summarize-num-or-interval "$subtract" "$dateSubtract")})

(defmethod ->rvalue :+ [[_ & args]]
  ;; Addition is commutative and any but not all elements of `args` can be intervals.
  ;; We pick the first arg that is not an interval and add the rest of args to it.
  ;; (It's the callers responsibility to make sure that the first non-interval argument
  ;; represents a date and not an offset like an integer would.)
  ;; If none of the args is an interval, we shortcut with a simple addition.
  (if (some interval? args)
    (if-let [[arg others] (u/pick-first (complement interval?) args)]
      (do
        (check-date-operations-supported)
        (reduce (num-or-interval-reducer :+) (->rvalue arg) others))
      (throw (ex-info "Summing intervals is not supported" {:args args})))
    {"$add" (mapv ->rvalue args)}))

(defmethod ->rvalue :- [[_ & [arg & others :as args]]]
  ;; Subtraction is not commutative so `arg` cannot be an interval.
  ;; If none of the args is an interval, we shortcut with a simple subtraction.
  (if (some interval? others)
    (do
      (check-date-operations-supported)
      (reduce (num-or-interval-reducer :-) (->rvalue arg) others))
    {"$subtract" (mapv ->rvalue args)}))

(defmethod ->rvalue :* [[_ & args]] {"$multiply" (mapv ->rvalue args)})

(defmethod ->rvalue :coalesce [[_ & args]] {"$ifNull" (mapv ->rvalue args)})

(defmethod ->rvalue :now [[_]]
  (if (driver/database-supports? :mongo :now (driver-api/database (driver-api/metadata-provider)))
    "$$NOW"
    (throw (ex-info (tru "now is not supported for MongoDB versions before 4.2")
                    {:database-version (:version (get-mongo-version))}))))

(defmethod ->rvalue :text [[_ expr]]
  {"$toString" (->rvalue expr)})

(defmethod ->rvalue :date [[_ expr]]
  (let [rvalue (->rvalue expr)]
    (with-rvalue-temporal-bucketing
      {"$cond" [{"$eq" [{"$type" rvalue} "string"]}
                {"$toDate" rvalue}

                rvalue]}
      :day)))

(defmethod ->rvalue :today [[_]]
  (->rvalue [:date [:now]]))

(defmethod ->rvalue :datetime [[_ expr {:keys [mode]}]]
  (let [rvalue (->rvalue expr)]
    (case (or mode :iso)
      :iso
      {"$dateFromString" {:dateString rvalue
                          :onError    rvalue}}

      :simple
      {"$dateFromString" {:dateString rvalue
                          :format     "%Y%m%d%H%M%S"
                          :onError    rvalue}}

      :simple-bytes
      {"$dateFromString" {:dateString {"$function"
                                       {:body base64-decoder
                                        :args [rvalue]
                                        :lang "js"}}
                          :format     "%Y%m%d%H%M%S"
                          :onError    rvalue}}

      :iso-bytes
      {"$dateFromString" {:dateString {"$function"
                                       {:body base64-decoder
                                        :args [rvalue]
                                        :lang "js"}}
                          :onError    rvalue}}

      :unix-nanoseconds
      {:$dateFromParts {:millisecond {$divide [rvalue 1000000]}, :year 1970, :timezone "UTC"}}

      :unix-microseconds
      {:$dateFromParts {:millisecond {$divide [rvalue 1000]}, :year 1970, :timezone "UTC"}}

      :unix-milliseconds
      {:$dateFromParts {:millisecond rvalue, :year 1970, :timezone "UTC"}}

      :unix-seconds
      {:$dateFromParts {:second rvalue, :year 1970, :timezone "UTC"}}

      ;; else
      (throw (ex-info (tru "Driver {0} does not support {1}" :mongo mode)
                      {:type driver-api/qp.error-type.unsupported-feature})))))

(defmethod ->rvalue :datetime-add [[_ inp amount unit]]
  (check-date-operations-supported)
  {"$dateAdd" {:startDate (->rvalue inp)
               :unit      unit
               :amount    amount}})

(defmethod ->rvalue :datetime-subtract
  [[_ inp amount unit]]
  (check-date-operations-supported)
  {"$dateSubtract" {:startDate (->rvalue inp)
                    :unit      unit
                    :amount    amount}})

(defmulti datetime-diff
  "Helper function for ->rvalue for `datetime-diff` clauses."
  {:added "0.46.0" :arglists '([x y unit])}
  (fn [_ _ unit] unit))

(defmethod datetime-diff :year
  [x y _unit]
  {$divide [(datetime-diff x y :month) 12]})

(defmethod datetime-diff :quarter
  [x y _unit]
  {$divide [(datetime-diff x y :month) 3]})

(defmethod datetime-diff :month
  [x y _unit]
  {$add [{"$dateDiff" {:startDate x, :endDate y, :unit "month"}}
           ;; dateDiff counts month boundaries not whole months, so we need to adjust
           ;; if x<y but x>y in the month calendar then subtract one month
           ;; if x>y but x<y in the month calendar then add one month
         {:$switch {:branches [{:case {:$and [{$lt [x y]}
                                              {$gt [{$dayOfMonth x} {$dayOfMonth y}]}]}
                                :then -1}
                               {:case {:$and [{$gt [x y]}
                                              {$lt [{$dayOfMonth x} {$dayOfMonth y}]}]}
                                :then 1}]
                    :default  0}}]})

(defmethod datetime-diff :week
  [x y _unit]
  {$divide [(datetime-diff x y :day) 7]})

(defn- simple-datediff
  [x y unit]
  {"$dateDiff" {:startDate x, :endDate y, :unit unit}})

(defmethod datetime-diff :day    [x y unit] (simple-datediff x y unit))
(defmethod datetime-diff :minute [x y unit] (simple-datediff x y unit))
(defmethod datetime-diff :second [x y unit] (simple-datediff x y unit))

(defmethod datetime-diff :hour
  [x y _unit]
  ;; mongo's dateDiff with hour isn't accurate to the millisecond
  {$divide [{"$dateDiff" {:startDate x, :endDate y, :unit "millisecond"}}
            3600000]})

(defmethod ->rvalue :datetime-diff [[_ x y unit]]
  (check-date-operations-supported)
  (datetime-diff (->rvalue x) (->rvalue y) unit))

;;; +----------------------------------------------------------------------------------------------------------------+
;;; |                                               CLAUSE APPLICATION                                               |
;;; +----------------------------------------------------------------------------------------------------------------+

;;; ----------------------------------------------------- filter -----------------------------------------------------

(defmethod ->rvalue ::not [[_ value]]
  {$not (->rvalue value)})

(defmulti compile-filter
  "Compile an mbql filter clause to datastructures suitable to query mongo. Note this is not the whole query but just
  compiling the \"where\" clause equivalent."
  {:added "0.39.0" :arglists '([clause])}
  driver-api/dispatch-by-clause-name-or-class)

(defmethod compile-filter :between
  [[_ field min-val max-val]]
  (compile-filter [:and
                   [:>= field min-val]
                   [:<= field max-val]]))

(defn- str-match-pattern [field options prefix value suffix]
  (if (driver-api/is-clause? ::not value)
    {$not (str-match-pattern field options prefix (second value) suffix)}
    (do
      (assert (and (contains? #{nil "^"} prefix) (contains? #{nil "$"} suffix))
              "Wrong prefix or suffix value.")
      {$regexMatch {"input" (->rvalue field)
                    "regex" (if (= (first value) :value)
                              (str prefix (->rvalue value) suffix)
                              {$concat (into [] (remove nil?) [(when (some? prefix) {$literal prefix})
                                                               (->rvalue value)
                                                               (when (some? suffix) {$literal suffix})])})
                    "options" (if (get options :case-sensitive true) "" "i")}})))

;; these are changed to {field {$regex "regex"}} instead of {field #regex} for serialization purposes. When doing
;; native query substitution we need a string and the explicit regex form is better there
(defmethod compile-filter :contains    [[_ field v opts]] {$expr (str-match-pattern field opts nil v nil)})
(defmethod compile-filter :starts-with [[_ field v opts]] {$expr (str-match-pattern field opts "^" v nil)})
(defmethod compile-filter :ends-with   [[_ field v opts]] {$expr (str-match-pattern field opts nil v "$")})

(defn- rvalue-is-variable? [rvalue]
  (and (string? rvalue)
       (str/starts-with? rvalue "$$")))

(defn- rvalue-is-field? [rvalue]
  (and (string? rvalue)
       (str/starts-with? rvalue "$")
       (not (rvalue-is-variable? rvalue))))

(defn- rvalue-can-be-compared-directly?
  "Whether `rvalue` is something simple that can be compared directly e.g.

    {$match {$field {$eq rvalue}}}

  as opposed to

    {$match {$expr {$eq [$field rvalue]}}}"
  [rvalue]
  (or (rvalue-is-field? rvalue)
      (and (not (map? rvalue))
           (not (rvalue-is-variable? rvalue))
           (not (instance? java.util.regex.Pattern rvalue)))))

(defn- filter-expr [operator field value]
  (let [field-rvalue (->rvalue field)
        value-rvalue (->rvalue value)]
    (if (and (rvalue-is-field? field-rvalue)
             (not (rvalue-is-field? value-rvalue))
             (rvalue-can-be-compared-directly? value-rvalue))
      ;; if we don't need to do anything fancy with field we can generate a clause like
      ;;
      ;;    {field {$lte 100}}
      {(str/replace-first field-rvalue #"^\$" "")
       ;; for the $eq operator we actually don't need to do {field {$eq 100}}, we can just do {field 100}
       (if (= (name operator) "$eq")
         value-rvalue
         {operator value-rvalue})}
      ;; if we need to do something fancy then we have to use `$expr` e.g.
      ;;
      ;;    {$expr {$lte [{$add [$field 1]} 100]}}
      {$expr {operator [field-rvalue value-rvalue]}})))

(defmethod compile-filter :=  [[_ field value]] (filter-expr $eq field value))
(defmethod compile-filter :!= [[_ field value]] (filter-expr $ne field value))
(defmethod compile-filter :<  [[_ field value]] (filter-expr $lt field value))
(defmethod compile-filter :>  [[_ field value]] (filter-expr $gt field value))
(defmethod compile-filter :<= [[_ field value]] (filter-expr $lte field value))
(defmethod compile-filter :>= [[_ field value]] (filter-expr $gte field value))

(defmethod compile-filter :and
  [[_ & args]]
  {$and (mapv compile-filter args)})

(defmethod compile-filter :or
  [[_ & args]]
  {$or (mapv compile-filter args)})

;; MongoDB doesn't support negating top-level filter clauses. So we can leverage the MBQL lib's `negate-filter-clause`
;; to negate everything, with the exception of the string filter clauses, which we will convert to a `{not <regex}`
;; clause (see `->rvalue` for `::not` above). `negate` below wraps the MBQL lib function
(defmulti ^:private negate
  {:arglists '([mbql-clause])}
  driver-api/dispatch-by-clause-name-or-class)

(defmethod negate :default [clause]
  (driver-api/negate-filter-clause clause))

(defmethod negate :and [[_ & subclauses]] (apply vector :or  (map negate subclauses)))
(defmethod negate :or  [[_ & subclauses]] (apply vector :and (map negate subclauses)))

(defmethod negate :contains    [[_ field v opts]] [:contains field [::not v] opts])
(defmethod negate :starts-with [[_ field v opts]] [:starts-with field [::not v] opts])
(defmethod negate :ends-with   [[_ field v opts]] [:ends-with field [::not v] opts])

(defmethod compile-filter :not [[_ subclause]]
  (compile-filter (negate subclause)))

(defmethod compile-filter :expression [[_ expression-name]]
  (let [expression-value (driver-api/expression-with-name (:query *query*) expression-name)]
    (compile-filter expression-value)))

(defmethod compile-filter :field [field-clause]
  {$expr {$toBool (->rvalue field-clause)}})

(defmethod compile-filter :value [value-clause]
  {$expr (->rvalue value-clause)})

(defn- handle-filter [{filter-clause :filter} pipeline-ctx]
  (if-not filter-clause
    pipeline-ctx
    (update pipeline-ctx :query conj {$match (compile-filter filter-clause)})))

(defmulti ^:private compile-cond
  {:arglists '([mbql-clause])}
  driver-api/dispatch-by-clause-name-or-class)

(defmethod compile-cond :between [[_ field min-val max-val]]
  (compile-cond [:and [:>= field min-val] [:<= field max-val]]))

(defn- index-of-code-point
  "See https://docs.mongodb.com/manual/reference/operator/aggregation/indexOfCP/"
  [source needle case-sensitive?]
  (let [source (if case-sensitive?
                 (->rvalue source)
                 {$toLower (->rvalue source)})
        needle (if case-sensitive?
                 (->rvalue needle)
                 {$toLower (->rvalue needle)})]
    {:$indexOfCP [source needle]}))

(defmethod compile-cond :contains
  [[_ field value opts]]
  {$ne [(index-of-code-point field value (get opts :case-sensitive true)) -1]})

(defmethod compile-cond :starts-with
  [[_ field value opts]]
  {$eq [(index-of-code-point field value (get opts :case-sensitive true)) 0]})

(defmethod compile-cond :ends-with
  [[_ field value opts]]
  (let [strcmp (fn [a b]
                 {$eq (if (get opts :case-sensitive true)
                        [a b]
                        [{$strcasecmp [a b]} 0])})]
    (strcmp {:$substrCP [(->rvalue field)
                         {$subtract [{:$strLenCP (->rvalue field)}
                                     {:$strLenCP (->rvalue value)}]}
                         {:$strLenCP (->rvalue value)}]}
            (->rvalue value))))

(defmethod compile-cond :=  [[_ field value]] {$eq [(->rvalue field) (->rvalue value)]})
(defmethod compile-cond :!= [[_ field value]] {$ne [(->rvalue field) (->rvalue value)]})
(defmethod compile-cond :<  [[_ field value]] {$lt [(->rvalue field) (->rvalue value)]})
(defmethod compile-cond :>  [[_ field value]] {$gt [(->rvalue field) (->rvalue value)]})
(defmethod compile-cond :<= [[_ field value]] {$lte [(->rvalue field) (->rvalue value)]})
(defmethod compile-cond :>= [[_ field value]] {$gte [(->rvalue field) (->rvalue value)]})

(defmethod compile-cond :and [[_ & args]] {$and (mapv compile-cond args)})
(defmethod compile-cond :or  [[_ & args]] {$or (mapv compile-cond args)})

(defmethod compile-cond :not [[_ subclause]]
  (compile-cond (negate subclause)))

(defmethod compile-cond :expression [[_ expression-name]]
  (let [expression-value (driver-api/expression-with-name (:query *query*) expression-name)]
    (compile-cond expression-value)))

(defmethod compile-cond :field [field-clause]
  (->rvalue field-clause))

(defmethod compile-cond :value [value-clause]
  (->rvalue value-clause))

;;; ----------------------------------------------------- joins ------------------------------------------------------

(defn- find-source-collection
  "Determine the source collection of a :join clause by recursively searching for a :source-table or a :collection
  clause in :source-query clauses."
  [join-or-query]
  (or (-> join-or-query :collection)
      (some->> join-or-query :source-table (driver-api/table (driver-api/metadata-provider)) :name)
      (some-> join-or-query :source-query recur)))

(defn- localize-join-alias
  "Rename :join-alias properties fields to ::join-local.
  See [[find-mapped-field-name]] for an explanation why this is done."
  [expr alias]
  (driver-api/replace expr
    [:field _ {:join-alias alias}]
    (update &match 2 set/rename-keys {:join-alias ::join-local})))

(defn- get-field-mappings [source-query projections]
  (when source-query
    (zipmap (mapcat #(% source-query) [:fields :breakout :aggregation])
            projections)))

(declare ^:private mbql->native-rec)

(defn- compile-join-source
  "Compile `source-query`, the source of a join clause, if any. Returns a map with the projections under the
  key :projections and the pipeline under the key :query.
  Handles both native and MBQL source queries."
  [source-query]
  (when source-query
    (if-let [native (:native source-query)]
      {:projections (:projections source-query)
       :query (:query native)}
      (binding [*query* (assoc (select-keys *query* [:database :type])
                               :query source-query)]
        (mbql->native-rec source-query)))))

(defn- handle-join [pipeline-ctx
                    {:keys [alias condition source-query strategy] :as join}]
  (let [{:keys [projections], pipeline :query, :or {projections [], pipeline []}} (compile-join-source source-query)
        ;; Get the mappings introduced by the source query.
        source-field-mappings (get-field-mappings source-query projections)
        ;; Find the fields the join condition refers to that are not coming from the joined query.
        ;; These have to be bound in the :let property of the $lookup stage, they cannot be referred to directly.
        own-fields (driver-api/match condition
                     [:field _ (_ :guard #(not= (:join-alias %) alias))])
        ;; Map the own fields to a fresh alias and to its rvalue.
        mapping (map (fn [f] (let [alias (-> (format "let_%s_" (->lvalue f))
                                             ;; ~ in let aliases provokes a parse error in Mongo. For correct function,
                                             ;; aliases should also contain no . characters (#32182).
                                             ;; - Spaces are allowed in columns and need to be replaced in let (#52807)
                                             (str/replace #"[~\. -]" "_")
                                             (str "__" (next-alias-index)))]
                               {:field f, :rvalue (->rvalue f), :alias alias}))
                     own-fields)]
    ;; Add the mappings from the source query and the let bindings of $lookup to the field mappings.
    ;; In the join pipeline the let bindings have to referenced with the prefix $$, so we add $ to the name.
    (binding [*field-mappings* (merge *field-mappings*
                                      source-field-mappings
                                      (into {} (map (juxt :field #(str \$ (:alias %)))) mapping))]
      (let [pipeline (cond-> pipeline
                       condition (conj {$match (compile-filter (localize-join-alias condition alias))}))
            lookup-as (get-join-alias alias)
            stages [{$lookup {:from (find-source-collection join)
                              :let (into {} (map (juxt :alias :rvalue)) mapping)
                              :pipeline pipeline
                              :as lookup-as}}
                    {$unwind {:path (str \$ lookup-as)
                              ;; left and inner joins are supported, the default is left join
                              :preserveNullAndEmptyArrays (not= strategy :inner-join)}}]]
        (-> pipeline-ctx
            (update :projections into projections)
            (update :query into stages))))))

(defn- handle-joins [{:keys [joins]} pipeline-ctx]
  (reduce handle-join pipeline-ctx joins))

;;; -------------------------------------------------- aggregation ---------------------------------------------------

(def ^:private aggregation-op
  "The set of operators handled by [[aggregation->rvalue]] and [[expand-aggregation]]."
  #{:avg :count :count-where :distinct :max :min :share :stddev :sum :sum-where :var :cum-sum :cum-count})

(defmethod ->rvalue :case [[_ cases options]]
  {:$switch {:branches (vec (for [[pred expr] cases]
                              {:case (compile-cond pred)
                               :then (->rvalue expr)}))
             :default  (->rvalue (:default options))}})

(defn- aggregation->rvalue [ag]
  (driver-api/match-one ag
    [:aggregation-options ag' _]
    (recur ag')

    [:count]
    {$sum 1}

    [:count arg]
    {$sum {$cond {:if   (->rvalue arg)
                  :then 1
                  :else 0}}}

    ;; these aggregation types can all be used in expressions as well so their implementations live above in the
    ;; general [[->rvalue]] implementations
    #{:avg :stddev :sum :min :max}
    (->rvalue &match)

    [:distinct arg]
    {$addToSet (->rvalue arg)}

    [:sum-where arg pred]
    {$sum {$cond {:if   (compile-cond pred)
                  :then (->rvalue arg)
                  :else 0}}}

    [:count-where pred]
    (recur [:sum-where [:value 1] pred])

    :else
    (throw
     (ex-info (tru "Don''t know how to handle aggregation {0}" ag)
              {:type :invalid-query, :clause ag}))))

(defn- unwrap-named-ag [[ag-type arg :as ag]]
  (if (= ag-type :aggregation-options)
    (recur arg)
    ag))

(defn- field-alias [[_tag _id-or-name opts, :as field-ref]]
  (or (driver-api/qp.add.desired-alias opts)
      (->lvalue field-ref)))

(mu/defn- breakouts-and-ags->projected-fields :- [:maybe [:sequential [:tuple driver-api/schema.common.non-blank-string :any]]]
  "Determine field projections for MBQL breakouts and aggregations. Returns a sequence of pairs like
  `[projected-field-name source]`."
  [breakout-fields aggregations]
  (concat
   (for [field-or-expr breakout-fields]
     [(field-alias field-or-expr) (format "$_id.%s" (field-alias field-or-expr))])
   (for [ag aggregations
         :let [ag-name (driver-api/aggregation-name (:query *query*) ag)]]
     [ag-name true])))

(defmulti ^:private expand-aggregation
  "Expand aggregations like `:share` and `:var` that can't be done as top-level aggregations in the `$group` stage
  alone. See [[group-and-post-aggregations]] for more info. See also
  https://www.mongodb.com/docs/manual/reference/operator/aggregation/group/#accumulator-operator for a list of what
  aggregation operators are allowed inside `$group` (vs the ones that have to be done in a later stage)."
  {:arglists '([mbql-clause])}
  (comp first unwrap-named-ag))

;;; * `:group` = stuff to do in the `$group` stage
;;;
;;; * `:post` = stuff to do in the `$addFields` stage immediately following it
;;;
;;; both of these are maps of LHS column name => RHS definition
;;;
;;; Note that this code doesn't handle expression aggregations, but that's ok because we do not support
;;; `:expression-aggregations` for Mongo DB.

(defmethod expand-aggregation :share
  [[_ pred :as ag]]
  (let [count-where-expr (name (gensym "$count-where-"))
        count-expr       (name (gensym "$count-"))
        pred             (if (= (first pred) :share)
                           (second pred)
                           pred)]
    {:group {(subs count-where-expr 1) (aggregation->rvalue [:count-where pred])
             (subs count-expr 1)       (aggregation->rvalue [:count])}
     :post  [{(driver-api/aggregation-name (:query *query*) ag) {$divide [count-where-expr count-expr]}}]}))

;; MongoDB doesn't have a variance operator, but you calculate it by taking the square of the standard deviation.
;; However, `$pow` is not allowed in the `$group` stage. So calculate standard deviation in the
(defmethod expand-aggregation :var
  [ag]
  (let [[_ expr]    (unwrap-named-ag ag)
        stddev-expr (name (gensym "$stddev-"))]
    {:group {(subs stddev-expr 1) (aggregation->rvalue [:stddev expr])}
     :post  [{(driver-api/aggregation-name (:query *query*) ag) {:$pow [stddev-expr 2]}}]}))

(defmethod expand-aggregation :cum-sum
  [ag]
  (let [[_ expr] (unwrap-named-ag ag)
        sum-expr (name (gensym "$sum-"))]
    {:group {(subs sum-expr 1) (aggregation->rvalue [:sum expr])}
     :window {(driver-api/aggregation-name (:query *query*) ag) sum-expr}}))

(defmethod expand-aggregation :cum-count
  [ag]
  (let [count-expr (name (gensym "$count-"))]
    {:group {(subs count-expr 1) (aggregation->rvalue [:count])}
     :window {(driver-api/aggregation-name (:query *query*) ag) count-expr}}))

(defmethod expand-aggregation :default
  [ag]
  {:group {(driver-api/aggregation-name (:query *query*) ag) (aggregation->rvalue ag)}})

(defn- extract-aggregations
  "Extract aggregation expressions embedded in `aggr-expr` using `parent-name`
  as a namespace for the names introduced for the aggregation expressions.
  The function returns a pair with the first element an expression like
  `aggr-expr` with aggregations replaced by new names. The second element of
  the pair is a map from the extracted aggregations to the new names conjoined
  on `aggregations-seen`. `:aggregation-option`s are ignored.

  For example, given \"expression\" as `parent-name`, the expression

  [:aggregation-options [:+ [:count [:field 1144 nil]]
                            [:* [:count [:field 1144 nil]]
                                [:sum [:+ [:field 1142 nil] 1]]]]
                        {:name \"expression\"}]
  is mapped to

  [[:+ \"$expression~count\" [:* \"$expression~count\" \"$expression~sum\"]]
   {[:count [:field 1144 nil]] \"expression~count\"
    [:sum [:+ [:field 1142 nil] 1]] \"expression~sum\"}]"
  ([aggr-expr parent-name] (extract-aggregations aggr-expr parent-name {}))
  ([aggr-expr parent-name aggregations-seen]
   (if (and (vector? aggr-expr) (seq aggr-expr))
     (let [[op & args] aggr-expr
           seen (get aggregations-seen aggr-expr)]
       (cond
         seen
         [(str \$ seen) aggregations-seen]

         (= :aggregation-options op)
         (extract-aggregations (first args) parent-name aggregations-seen)

         (aggregation-op op)
         (let [aliases-taken (set (vals aggregations-seen))
               aggr-name (driver-api/aggregation-name (:query *query*) aggr-expr)
               desired-alias (str parent-name "~" aggr-name)
               ;; find a free alias by appending increasing integers
               ;; to the desired alias
               aggr-name (some (fn [suffix]
                                 (let [alias (str desired-alias suffix)]
                                   (when-not (aliases-taken alias)
                                     alias)))
                               (cons "" (iterate inc 1)))]
           [(str \$ aggr-name) (assoc aggregations-seen aggr-expr aggr-name)])

         :else
         (reduce (fn [[ges as] arg]
                   (let [[ge as] (extract-aggregations arg parent-name as)]
                     [(conj ges ge) as]))
                 [[op] aggregations-seen]
                 args)))
     [aggr-expr aggregations-seen])))

(defn- simplify-extracted-aggregations
  "Simplifies the extracted aggregation for `aggr-name` if the expression
  contains only a single top-level aggregation. In this case there is no
  need for namespacing and `aggr-name` can be used as the name of the group
  introduced for the aggregation.
  `extracted-aggr` is typically the result of [[extract-aggregations]]."
  [aggr-name [aggr-expr aggregations-seen :as extracted-aggr]]
  (if-let [aggr-group (and (string? aggr-expr)
                           (str/starts-with? aggr-expr (str \$ aggr-name "~"))
                           (= (count aggregations-seen) 1)
                           (let [[k v] (first aggregations-seen)]
                             (when (= v (subs aggr-expr 1))
                               k)))]
    [(str \$ aggr-name) {aggr-group aggr-name}]
    extracted-aggr))

(defn- adjust-distinct-aggregations
  "This function transforms `aggr-expr'` as in [[expand-aggregations]] so identifiers representing array that is
  a set of _distinct_ values are wrapped in `{$size...}.

  `aggr-expr` is expected to be a clause that is a result of [[extract-aggregations]]. For details see its docstring.

  Distinct values are computed using the `$addToSet` in a `$group` stage. `$size` transforms them to actual count."
  [[aggr-expr mappings]]
  (let [distinct-keys (filter (fn [[clause]] (= :distinct clause)) (keys mappings))
        distinct-vals (into #{}
                            (comp (map #(get mappings %))
                                  ;; \$ is added to identifiers so eg. `q~count1` becomes `$q~count1`. Those values
                                  ;; are used match against `aggr-expr` where identifiers have the prefix.
                                  (map #(str \$ %)))
                            distinct-keys)]
    [(perf/postwalk (fn [x]
                      (if (and (string? x)
                               (distinct-vals x))
                        {$size x}
                        x))
                    aggr-expr)
     mappings]))

(defn- expand-aggregations
  "Expands the aggregations in `aggr-expr` into groupings and post processing
  expressions. The return value is a map with the following keys:
  `:group` - a map containing the groups of aggregation expression,
  `:post` - a vector of maps containing the expressions referring to the
  fields generated by the groups. Each map in the `:post` vector may (and
  usually does) refer to the fields introduced by the preceding maps."
  [aggr-expr]
  (let [aggr-name (driver-api/aggregation-name (:query *query*) aggr-expr)
        [aggr-expr' aggregations-seen] (->> (extract-aggregations aggr-expr aggr-name)
                                            (simplify-extracted-aggregations aggr-name)
                                            adjust-distinct-aggregations)

        raggr-expr (->rvalue aggr-expr')
        expandeds (map (fn [[aggr name]]
                         (expand-aggregation [:aggregation-options aggr {:name name}]))
                       aggregations-seen)]
    {:group (into {} (map :group) expandeds)
     :post (cond-> [(into {} (mapcat :post) expandeds)]
             (not= raggr-expr (str \$ aggr-name)) (conj {aggr-name raggr-expr}))
     :window (into {} (map :window) expandeds)}))

(defn- order-postprocessing
  "Takes a sequence of post processing vectors (see [[expand-aggregations]]) and
  returns a sequence with the maps at the same index merged.
  This is an optimization to reduce the number of stages in the pipeline and
  assumes that
    a) maps can only depend on maps preceding them in their own vector and
    b) the keys in the maps at the same level are unique."
  [posts]
  (when (seq posts)
    (for [i (range (apply max (map count posts)))]
      (into {} (map #(get % i)) posts))))

(mu/defn- order-by->$sort :- [:map-of driver-api/schema.common.non-blank-string [:enum -1 1]]
  [order-by :- [:sequential driver-api/mbql.schema.OrderBy]]
  (into
   (ordered-map/ordered-map)
   (for [[direction field] order-by]
     [(->lvalue field) (case direction
                         :asc   1
                         :desc -1)])))

(defn- window-output-clause
  "Takes a pair of [output-name input-name] and generates an output clause suitable for
  including in a `$setWindowFields` output block."
  [input-name]
  {$sum input-name
   "window" {"documents" ["unbounded" "current"]}})

(defn- sort-lookup
  "Generates a lookup string for a particular field"
  [id name]
  (if (id name)
    (str "_id." name)
    name))

(defn- window-sort
  "Converts a `$sort` body to something that can be used in a `sortBy` clause in a
  `$setWindowFields` stage."
  [id pairs]
  (when-let [pair-seq (seq pairs)]
    (into (ordered-map/ordered-map)
          (map (fn [[name dir]] [(sort-lookup id name) dir]))
          pair-seq)))

(defn- window-sort-and-partitions
  "Calculates the appropriate sort and partition fields for a `$setWindowFields` stage."
  [id breakouts order-by]
  (let [finest-temporal-index
        (driver-api/finest-temporal-breakout-index breakouts 2)

        sort-index (or finest-temporal-index
                       (dec (count breakouts)))
        sort-name (first (nth (seq id) sort-index))
        default-sort {(sort-lookup id sort-name) 1}
        user-sort (when order-by
                    (binding [*field-mappings*
                              (merge *field-mappings*
                                     (into {} (map (juxt identity field-alias)) breakouts))]
                      (order-by->$sort order-by)))
        sort-expr (or
                   ;; if there is only one breakout, always use the user's sort order
                   (when (= (count id) 1)
                     (window-sort id user-sort))

                   ;; if we don't have a temporal breakout, sort by the last breakout, but
                   ;; use the user's sort direction if specified
                   (when-not finest-temporal-index
                     (->> user-sort
                          (filter #(= sort-name (first %)))
                          (window-sort id)))

                   default-sort)

        partition-expr (into {}
                             (map (fn [[name]] [name (str "$_id." name)]))
                             (m/remove-nth sort-index id))]
    {:sort-expr sort-expr
     :partition-expr partition-expr}))

(defn- window-accumulators
  "Takes a map of {output-name input-name ...} and generates a `$setWindowFields` stage that
  produces a cumulative sum of those fields."
  [window-vals id breakouts order-by]
  ;; if id is empty, we don't have any breakouts and so don't need to fiddle around with $setWindowFields
  (if (empty? id)
    [{$addFields window-vals}]
    (let [{:keys [sort-expr partition-expr]} (window-sort-and-partitions id breakouts order-by)]
      [{$setWindowFields
        (cond-> {"sortBy" sort-expr
                 "output" (update-vals window-vals window-output-clause)}
          (seq partition-expr) (assoc "partitionBy" partition-expr))}])))

(defn- group-and-post-aggregations
  "Mongo is picky about which top-level aggregations it allows with groups. Eg. even
   though [:/ [:count-if ...] [:count]] is a perfectly fine reduction, it's not allowed. Therefore
   more complex aggregations are split in two: the reductions are done in `$group` stage after which
   we do postprocessing in `$addFields` stage to arrive at the final result.
   The groups are assumed to be independent an collapsed into a single stage, but separate
   `$addFields` stages are created for post processing so that stages can refer to the results
   of preceding stages.
   The intermittent results accrued in `$group` stage are discarded in the final `$project` stage.
   Meanwhile, cumulative aggregations cannot be done in either a `$group` or a `$addFields` stage
   and instead need their own `$setWindowFields` stage."
  [id breakouts aggregations order-by]
  (let [expanded-ags (map expand-aggregations aggregations)
        group-ags    (mapcat :group expanded-ags)
        post-ags     (order-postprocessing (map :post expanded-ags))
        window-values   (into {} (map :window) expanded-ags)]
    (into [{$group (into (ordered-map/ordered-map "_id" id) group-ags)}]
          cat
          [(when (seq window-values)
             (window-accumulators window-values id breakouts order-by))
           (keep (fn [p] (when (seq p) {$addFields p}))
                 post-ags)])))

(defn- projection-group-map [fields]
  (reduce
   (fn [m field-clause]
     (assoc-in
      m
      (driver-api/match-one field-clause
        [:field (field-id :guard integer?) _]
        (str/split (field-alias field-clause) #"\.")

        [:field (field-name :guard string?) _]
        [field-name]

        [:expression expr-name _]
        [expr-name])
      (->rvalue field-clause)))
   (ordered-map/ordered-map)
   fields))

(defn- breakouts-and-ags->pipeline-stages
  "Return a sequeunce of aggregation pipeline stages needed to implement MBQL breakouts and aggregations."
  [projected-fields breakout-fields aggregations order-by]
  (mapcat
   (partial remove nil?)
   [;; create the $group clause
    (group-and-post-aggregations
     (when (seq breakout-fields)
       (projection-group-map breakout-fields))
     breakout-fields
     aggregations
     order-by)
    [;; Sort by _id (group)
     {$sort {"_id" 1}}
     ;; now project back to the fields we expect
     {$project (into
                (ordered-map/ordered-map "_id" false)
                projected-fields)}]]))

(defn- handle-breakout+aggregation
  "Add projections, groupings, sortings, and other things needed to the Query pipeline context (`pipeline-ctx`) for
  MBQL `aggregations` and `breakout-fields`."
  [{breakout-fields :breakout, aggregations :aggregation, :keys [order-by]} pipeline-ctx]
  (if-not (or (seq aggregations) (seq breakout-fields))
    ;; if both aggregations and breakouts are empty, there's nothing to do...
    pipeline-ctx
    ;; determine the projections we'll need. projected-fields is like [[projected-field-name source]]`
    (let [projected-fields (breakouts-and-ags->projected-fields breakout-fields aggregations)
          pipeline-stages  (breakouts-and-ags->pipeline-stages projected-fields breakout-fields aggregations order-by)]
      (-> pipeline-ctx
          ;; add :projections key which is just a sequence of the names of projections from above
          (assoc :projections (vec (for [[field] projected-fields]
                                     field)))
          ;; now add additional clauses to the end of :query as applicable
          (update :query into pipeline-stages)))))

;;; ---------------------------------------------------- order-by ----------------------------------------------------

(defn- remove-parent-fields
  "Removes any and all entries in `fields` that are parents of another field in `fields`. This is necessary because as
  of MongoDB 4.4, including both will result in an error (see:
  `https://www.mongodb.com/docs/manual/reference/operator/aggregation/project/#path-collision-errors-in-embedded-fields`).

  Removing parents is useful when sorting, because leaf fields sort."
  [fields]
  (let [parent->child-id (reduce (fn [acc [agg-type field-id & _]]
                                   (if (and (= agg-type :field)
                                            (integer? field-id))
                                     (let [{:keys [parent-id], :as field} (driver-api/field (driver-api/metadata-provider) field-id)]
                                       (if parent-id
                                         (update acc parent-id conj (u/the-id field))
                                         acc))
                                     acc))
                                 {}
                                 fields)]
    (remove (fn [[_ field-id & _]]
              (and (integer? field-id) (contains? parent->child-id field-id)))
            fields)))

(defn- remove-child-fields
  "Removes any and all entries in `fields` that are children of another field in `fields`. This is necessary because as
  of MongoDB 4.4, including both will result in an error (see:
  `https://www.mongodb.com/docs/manual/reference/operator/aggregation/project/#path-collision-errors-in-embedded-fields`).

  Removing children is useful when projecting, because the return value of a mongo query is json, and so a parent
  includes all of its children."
  [fields]
  (let [field-ids (into #{}
                        (map (fn [[agg-type field-id]]
                               (when (and (= agg-type :field)
                                          (integer? field-id))
                                 field-id)))
                        fields)]
    (remove (fn [[agg-type field-id]]
              (when (and (= agg-type :field)
                         (integer? field-id))
                (let [{:keys [parent-id]} (driver-api/field (driver-api/metadata-provider) field-id)]
                  (and parent-id (contains? field-ids parent-id)))))
            fields)))

(defn- handle-order-by [{:keys [order-by breakout aggregation]} pipeline-ctx]
  (let [breakout-fields (set breakout)
        sort-fields (for [field (remove-parent-fields (map second order-by))
                          ;; We only care about expressions and bucketing not added as breakout
                          :when (and (not (contains? breakout-fields field))
                                     (let [dispatch-value
                                           (driver-api/dispatch-by-clause-name-or-class field)]
                                       (or (= :expression dispatch-value)
                                           (and (= :field dispatch-value)
                                                (let [[_ _ {:keys [temporal-unit]}] field]
                                                  (and (some? temporal-unit)
                                                       (not= temporal-unit :default)))))))]
                      [(->lvalue field) (->rvalue field)])
        ;; We have already compiled breakout fields into the document.
        breakout-field-mappings (into {} (map (juxt identity field-alias)) breakout)
        ;; We have already sorted ascending by the breakout fields so we don't have to repeat the
        ;; same sort.
        explicit-order-by
        (when (and (seq order-by)
                   (not= order-by (map (fn [field] [:asc field]) breakout)))
          (binding [*field-mappings* (merge *field-mappings* breakout-field-mappings)]
            (order-by->$sort order-by)))

        cumulative-order-by
        (when-let [finest-temporal-index
                   (and (seq (filter (fn [[_ [agg-type]]] (#{:cum-sum :cum-count} agg-type)) aggregation))
                        (driver-api/finest-temporal-breakout-index breakout 2))]
          (let [id (projection-group-map breakout)]
            (as-> (keys id) lst
              (m/remove-nth finest-temporal-index lst)
              (concat lst [(nth (keys id) finest-temporal-index)])
              (filter (fn [key] (not (and explicit-order-by
                                          (explicit-order-by key)))) lst)
              (map (fn [name] [name 1]) lst))))

        combined-order-by
        (when (or explicit-order-by cumulative-order-by)
          {$sort (into (ordered-map/ordered-map)
                       (concat explicit-order-by cumulative-order-by))})]
    (cond-> pipeline-ctx
      (seq sort-fields) (update :query conj
                                ;; We $addFields before sorting, otherwise expressions will not be available for the sort
                                {$addFields (into (ordered-map/ordered-map) sort-fields)})
      combined-order-by (update :query #(conj % combined-order-by)))))

;;; ----------------------------------------------------- fields -----------------------------------------------------

(defn- handle-fields [{:keys [fields]} pipeline-ctx]
  (if-not (seq fields)
    pipeline-ctx
    (let [new-projections (for [field (remove-child-fields fields)]
                            [(field-alias field) (->rvalue field)])]
      (-> pipeline-ctx
          ;; we can't ask mongo for both a parent field and its child at the same time, because mongo will throw an
          ;; error. It's also unnecessary, because the parent includes the child. However, we need to list all fields
          ;; we think we want in :projections so that we know to look for them all once we get data back.
          (assoc :projections (map field-alias fields))
          ;; add project _id = false to keep _id from getting automatically returned unless explicitly specified
          (update :query conj {$project (into
                                         (ordered-map/ordered-map "_id" false)
                                         new-projections)})))))

;;; ----------------------------------------------------- limit ------------------------------------------------------

(defn- handle-limit [{:keys [limit]} pipeline-ctx]
  (if-not limit
    pipeline-ctx
    (update pipeline-ctx :query conj {$limit limit})))

;;; ------------------------------------------------------ page ------------------------------------------------------

(defn- handle-page [{{page-num :page, items-per-page :items, :as page-clause} :page} pipeline-ctx]
  (if-not page-clause
    pipeline-ctx
    (update pipeline-ctx :query concat (filter some? [(let [offset (* items-per-page (dec page-num))]
                                                        (when-not (zero? offset)
                                                          {$skip offset}))
                                                      {$limit items-per-page}]))))

;;; +----------------------------------------------------------------------------------------------------------------+
;;; |                                                 Process & Run                                                  |
;;; +----------------------------------------------------------------------------------------------------------------+

(defn- add-aggregation-pipeline
  ([inner-query]
   (add-aggregation-pipeline inner-query {:projections [], :query []}))
  ([inner-query pipeline-ctx]
   (reduce (fn [pipeline-ctx f]
             (f inner-query pipeline-ctx))
           pipeline-ctx
           [#'handle-joins
            #'handle-filter
            #'handle-breakout+aggregation
            #'handle-order-by
            #'handle-fields
            #'handle-limit
            #'handle-page])))

(mu/defn- generate-aggregation-pipeline :- [:map
                                            [:projections Projections]
                                            [:query Pipeline]]
  "Generate the aggregation pipeline. Returns a sequence of maps representing each stage."
  [inner-query :- driver-api/MBQLQuery]
  (add-aggregation-pipeline inner-query))

(defn- query->collection-name
  "Return `:collection` from a source query, if it exists."
  [query]
  (driver-api/match-one query
    (_ :guard (every-pred map? :collection))
    ;; ignore source queries inside `:joins` or `:collection` outside of a `:source-query`
    (when (let [parents (set &parents)]
            (and (contains? parents :source-query)
                 (not (contains? parents :joins))))
      (:collection &match))))

(defn- log-aggregation-pipeline [form]
  (when-not driver-api/*disable-qp-logging*
    (log/tracef "\nMongo aggregation pipeline:\n%s\n"
                (u/pprint-to-str 'green (perf/postwalk #(if (symbol? %) (symbol (name %)) %) form)))))

(defn simple-mbql->native
  "Compile a simple (non-nested) MBQL query."
  [query]
  (generate-aggregation-pipeline (or (:query query) query)))

(defn parse-query-string
  "Parse a serialized native query. Like a normal JSON parse, but handles BSON/MongoDB extended JSON forms."
  [^String s]
  (try
    ;; Only way to parse _ejson array_ using bson library is through `BsonArray/parse`. That results in sequence
    ;; of `org.bson.BsonDocument`s. Currently `org.bson.Document` fits our needs better as it (1) implements `Map`
    ;; and (2) converts `BsonValue`s to java types.
    (mapv (fn [^org.bson.BsonValue v] (-> v .asDocument .toJson org.bson.Document/parse))
          (org.bson.BsonArray/parse s))
    (catch Throwable e
      (throw (ex-info (tru "Unable to parse query: {0}" (.getMessage e))
                      {:type  driver-api/qp.error-type.invalid-query
                       :query s}
                      e)))))

(defn- mbql->native-rec
  "Compile a potentially nested MBQL query."
  [inner-query]
  (if-let [source-query (-> inner-query :source-query)]
    (let [compiled (or (when-let [nq (:native source-query)]
                         (cond
                           (string? (:query nq))
                           (-> source-query
                               (dissoc :native)
                               (assoc :query (parse-query-string (:query nq))))

                           :else
                           nq))
                       (binding [*nesting-level* (inc *nesting-level*)]
                         (mbql->native-rec source-query)))
          field-mappings (get-field-mappings source-query (:projections compiled))]
      (binding [*field-mappings* field-mappings]
        (merge compiled (add-aggregation-pipeline inner-query compiled))))
    (simple-mbql->native inner-query)))

;;; TODO (Cam 6/20/25) -- MongoDB QP code is completely broken and does not consistently look at the keys added
;;; by [[driver-api/add-alias-info]]. Fixing all the busted code above is more work than I want to take on right now, so
;;; until we get around to fixing that let's just walk the query and replace all the non-add-alias-info keys with the
;;; values added by add-alias-info.
(defn- HACK-update-aliases [form]
  (letfn [(prepend-nfc-path [{nfc-path      driver-api/qp.add.nfc-path,
                              source-alias  driver-api/qp.add.source-alias,
                              desired-alias driver-api/qp.add.desired-alias,
                              :as           opts}]
            (when (seq nfc-path)
              (let [nfc-path-str (str/join \. nfc-path)]
                (-> opts
                    (assoc driver-api/qp.add.source-alias  (str nfc-path-str \. source-alias)
                           driver-api/qp.add.desired-alias (str nfc-path-str \. desired-alias))
                    (dissoc driver-api/qp.add.nfc-path)))))
          (update-name [{field-name :name, source-alias driver-api/qp.add.source-alias, :as opts}]
            (when (and source-alias
                       (not= field-name source-alias))
              (assoc opts :name source-alias)))
          (remove-bad-join-alias [{:keys [join-alias], source-table driver-api/qp.add.source-table, :as opts}]
            (when (and join-alias
                       (= source-table driver-api/qp.add.source))
              (dissoc opts :join-alias)))
          (update-join-alias [{:keys [join-alias], source-table driver-api/qp.add.source-table, :as opts}]
            (when (and join-alias
                       source-table
                       (not= join-alias source-table))
              (assoc opts :join-alias source-table)))
          (update-opts [opts]
            (reduce
             (fn
               [opts f]
               (or (f opts)
                   opts))
             opts
             [prepend-nfc-path
              update-join-alias
              update-name
              remove-bad-join-alias
              update-join-alias]))
          (update-field-ref [[_tag id-or-name {source-alias driver-api/qp.add.source-alias, :as opts}]]
            (let [opts (update-opts opts)]
              (if (and (string? id-or-name)
                       source-alias)
                [:field source-alias opts]
                [:field id-or-name opts])))]
    (driver-api/replace form
      :field
      (update-field-ref &match)

      (join :guard (every-pred map?
                               driver-api/qp.add.alias
                               #(not= (driver-api/qp.add.alias %) (:alias %))))
      (recur (assoc join :alias (driver-api/qp.add.alias join))))))

(defn- preprocess
  [inner-query]
  (-> inner-query
      (driver-api/add-alias-info {:globally-unique-join-aliases? true})
      HACK-update-aliases))

(defn mbql->native
  "Compile an MBQL query."
  [query]
  (let [query (-> query
                  driver-api/->legacy-MBQL
                  (update :query preprocess))]
    (binding [*query* query
              *next-alias-index* (volatile! 0)]
      (let [source-table-name (if-let [source-table-id (driver-api/query->source-table-id query)]
                                (:name (driver-api/table (driver-api/metadata-provider) source-table-id))
                                (query->collection-name query))
            compiled (mbql->native-rec (:query query))]
        (log-aggregation-pipeline (:query compiled))
        (assoc compiled
               :collection source-table-name
               :mbql?      true)))))
